package com.iteaj.iot.test.mqtt;

import com.iteaj.iot.Message;
import com.iteaj.iot.client.ClientProtocolHandle;
import com.iteaj.iot.client.IotClientBootstrap;
import com.iteaj.iot.client.mqtt.MqttClient;
import com.iteaj.iot.client.mqtt.MqttConnectProperties;
import com.iteaj.iot.config.ConnectProperties;
import com.iteaj.iot.consts.ExecStatus;
import com.iteaj.iot.test.IotTestProperties;
import com.iteaj.iot.test.TestConst;
import com.iteaj.iot.test.TestProtocolType;
import com.iteaj.iot.test.client.fixed.FixedLengthClient;
import com.iteaj.iot.test.client.fixed.FixedLengthClientMessage;
import com.iteaj.iot.test.client.fixed.FixedLengthRequestProtocol;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.StringUtils;

import java.time.Duration;
import java.time.Instant;

/**
 * create time: 2021/9/3
 *
 * @author iteaj
 * @since 1.0
 */
public class MqttClientTestHandle implements ClientProtocolHandle<MqttPublishTestProtocol>, InitializingBean {

    @Autowired
    private ThreadPoolTaskScheduler scheduler;
    @Autowired
    private MqttClientTestComponent component;
    private Logger logger = LoggerFactory.getLogger(getClass());
    public static final String IotMqttWillClient = "willClient";

    public static final String AT_MOST_ONCE_TOPIC = "/test/atMostOnce";
    public static final String EXACTLY_ONCE_TOPIC = "/test/exactlyOnce";
    public static final String AT_LEAST_ONCE_TOPIC = "/test/atLeastOnce";

    @Override
    public Object handle(MqttPublishTestProtocol protocol) {
        return null;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        IotTestProperties.TestMqttConnectProperties config = (IotTestProperties.TestMqttConnectProperties) component.getConfig();

        scheduler.schedule(() -> {
            MqttConnectProperties willTopicConfig = new MqttConnectProperties();
            // 保留遗嘱的测试客户端
            BeanUtils.copyProperties(config, willTopicConfig, "clientId");
            willTopicConfig.setAllIdleTime(0);
            willTopicConfig.setWriterIdleTime(0);
            willTopicConfig.setReaderIdleTime(0);
            willTopicConfig.setWillTopic("/willTopic/"+willTopicConfig.getClientId());
            willTopicConfig.setWillRetain(false); // 不保留的遗嘱
            willTopicConfig.setWillMessage("{\"retain\": false}"); // 不保留遗嘱测试
            MqttClient retainWillClient = component.createNewClientAndConnect(willTopicConfig);

            MqttConnectProperties willRetainTopicConfig = new MqttConnectProperties();
            // 不保留遗嘱的测试客户端
            BeanUtils.copyProperties(config, willRetainTopicConfig, "clientId");
            willRetainTopicConfig.setAllIdleTime(0);
            willRetainTopicConfig.setWriterIdleTime(0);
            willRetainTopicConfig.setReaderIdleTime(0);
            willRetainTopicConfig.setWillTopic("/willTopic/"+willTopicConfig.getClientId());
            willRetainTopicConfig.setWillRetain(true); // 保留的遗嘱
            willRetainTopicConfig.setWillMessage("{\"retain\": true}"); // 保留遗嘱测试
            MqttClient willClient = component.createNewClientAndConnect(willRetainTopicConfig);

            scheduler.schedule(() -> {
                willClient.disconnect(false); // 断线重连测试
                retainWillClient.disconnect(true); // 直接移除客户端测试
            }, Instant.now().plusSeconds(10));

        }, Instant.now().plusSeconds(20));

        if(config.getNum() > 0) {
            scheduler.schedule(() -> {
                int clientNum = config.getNum();
                if(clientNum > 0) {
                    for(int i=0; i< clientNum; i++) {
                        MqttConnectProperties properties = new MqttConnectProperties();
                        BeanUtils.copyProperties(config, properties, "clientId");
                        properties.setAllIdleTime(0);
                        properties.setWriterIdleTime(0);
                        properties.setReaderIdleTime(0);

                        component.createNewClientAndConnect(properties);
                    }
                }
            }, Instant.now().plusSeconds(20));

            scheduler.scheduleAtFixedRate(() -> {
                component.clients().forEach(item -> {
                    // 测试最多发送一次报文
                    new MqttPublishTestProtocol(MqttQoS.AT_MOST_ONCE, "most").request(((MqttClient) item).getConfig(), protocol -> {
                        final Message.MessageHead head = protocol.requestMessage().getHead();
                        logger.info(TestConst.LOGGER_PROTOCOL_DESC, "mqtt协议", MqttQoS.AT_MOST_ONCE
                                , head.getEquipCode(), head.getMessageId(), protocol.getExecStatus().desc);
                        return null;
                    });
                });
            }, Instant.now().plusSeconds(30), Duration.ofSeconds(1));
        } else {
            scheduler.scheduleAtFixedRate(() -> {
                try {

                    // 测试最多发送一次报文
                    new MqttPublishTestProtocol(MqttQoS.AT_MOST_ONCE, AT_MOST_ONCE_TOPIC).timeout(0).request(protocol -> {
                        final Message.MessageHead head = protocol.requestMessage().getHead();
                        if(protocol.getExecStatus() == ExecStatus.success) {
                            logger.info(TestConst.LOGGER_PROTOCOL_DESC, "mqtt协议", MqttQoS.AT_MOST_ONCE
                                    , head.getEquipCode(), head.getMessageId(), "通过");
                        } else {
                            logger.error(TestConst.LOGGER_PROTOCOL_DESC, "mqtt协议", MqttQoS.AT_MOST_ONCE
                                    , head.getEquipCode(), head.getMessageId(), protocol.getExecStatus().desc);
                        }
                        return null;
                    });

                    // 测试 client->broker->client->broker->client
//                    new MqttPublishTestProtocol(MqttQoS.AT_MOST_ONCE, "broker").request(protocol -> {
//                        if(protocol.getExecStatus() == ExecStatus.success) {
//                            final Message.MessageHead head = protocol.responseMessage().getHead();
//                            logger.info(TestConst.LOGGER_PROTOCOL_DESC, "mqtt协议", "broker主动请求测试"
//                                    , head.getEquipCode(), head.getMessageId(), "通过");
//                        } else {
//                            final Message.MessageHead head = protocol.requestMessage().getHead();
//                            logger.error(TestConst.LOGGER_PROTOCOL_DESC, "mqtt协议", "broker主动请求测试"
//                                    , head.getEquipCode(), head.getMessageId(), protocol.getExecStatus().desc);
//                        }
//                        return null;
//                    });
                    Thread.sleep(1000);

                    // 最少发送一次报文
                    new MqttPublishTestProtocol(MqttQoS.AT_LEAST_ONCE, AT_LEAST_ONCE_TOPIC).timeout(0).request(protocol -> {
                        final Message.MessageHead head = protocol.requestMessage().getHead();
                        if(protocol.getExecStatus() == ExecStatus.success) {
                            logger.info(TestConst.LOGGER_PROTOCOL_DESC, "mqtt协议", MqttQoS.AT_LEAST_ONCE
                                    , head.getEquipCode(), head.getMessageId(), "通过");
                        } else {
                            logger.error(TestConst.LOGGER_PROTOCOL_DESC, "mqtt协议", MqttQoS.AT_LEAST_ONCE
                                    , head.getEquipCode(), head.getMessageId(), protocol.getExecStatus().desc);
                        }
                        return null;
                    });
                    Thread.sleep(2000);

                    // 确保一定发送一次
                    new MqttPublishTestProtocol(MqttQoS.EXACTLY_ONCE, EXACTLY_ONCE_TOPIC).timeout(0).request(protocol -> {
                        final Message.MessageHead head = protocol.requestMessage().getHead();
                        if(protocol.getExecStatus() == ExecStatus.success) {
                            logger.info(TestConst.LOGGER_PROTOCOL_DESC, "mqtt协议", MqttQoS.EXACTLY_ONCE
                                    , head.getEquipCode(), head.getMessageId(), "通过" );
                        } else {
                            logger.error(TestConst.LOGGER_PROTOCOL_DESC, "mqtt协议", MqttQoS.EXACTLY_ONCE
                                    , head.getEquipCode(), head.getMessageId(), protocol.getExecStatus().desc);
                        }
                        return null;
                    });
                    Thread.sleep(3000);
                } catch (Exception e) {
                    e.printStackTrace();
                }


            }, Instant.now().plusSeconds(18), Duration.ofSeconds(30));
        }
    }
}
